﻿/*
 * TcpMediaCasterConnection.cpp
 *
 *  Created on: 2016年6月21日
 *      Author: terry
 */

#include "TcpMediaCasterConnection.h"

#include "CLog.h"
#include "TStringUtil.h"
#include "Socket.h"
#include "MediaCasterTable.h"
#include <assert.h>
#include <StopWatch.h>
#include "TcpMediaCasterServer.h"
#include "MediaTransportHelper.h"
#include "MediaCasterConfig.h"


namespace av
{


TcpMediaCasterConnection::TcpMediaCasterConnection():
    m_streamState(av::STATE_STOPPED),
    m_header(),
    m_headerReady(),
    m_jamming(),
    m_maxCacheSize(),
    m_empty()
{
    m_maxCacheSize = 1024 * 1000;
}

TcpMediaCasterConnection::~TcpMediaCasterConnection()
{
}


int TcpMediaCasterConnection::onOpen()
{
    bufferevent* bev = getBufferEvent();
    bufferevent_setwatermark(bev, EV_WRITE, 1024 * 16, m_maxCacheSize);

    //bufferevent_enable(bev, EV_WRITE);

    evutil_socket_t fd = bufferevent_getfd(bev);
    evutil_make_socket_nonblocking(fd);

    int value = TRUE;
    int ret = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&value, sizeof(value));

    comn::Socket sock;
    sock.attach(fd);
    if (MediaCasterConfig::s_sendBufSize > 0)
    {
        sock.setSendBufferSize(MediaCasterConfig::s_sendBufSize);
    }

    comn::SockAddr peerAddr = sock.getPeerName();
    m_peerID = comn::StringUtil::format("%s:%d", peerAddr.getIPAddr(), peerAddr.getPort());
    sock.detach();

    return 0;
}

void TcpMediaCasterConnection::onClose()
{
    CLog::debug("TcpMediaCasterConnection(%s) onClose stream%s\n",
        m_peerID.c_str(),
        m_streamName.c_str());
    
    CMediaCasterPtr caster;
    MediaCasterTable::instance().find(m_streamName, caster);
    if (caster)
    {
        av::MediaSinkPtr sink = shared_from_this();
        caster->removeSink(sink);
    }
}

void TcpMediaCasterConnection::onWrite(struct bufferevent *bev)
{
    return;

    MediaPacketPtr pkt;
    m_pktQueue.pop(pkt, -1);
    if (pkt)
    {
        writeMedia(pkt);
    }
    else
    {
        m_empty = true;

        bufferevent* bev = getBufferEvent();
        bufferevent_disable(bev, EV_WRITE);

        CLog::debug("Connection(%s) onWrite. packet queue is empty. \n", m_peerID.c_str());
    }

    return;

    
    while (true)
    {
        if (m_pktQueue.pop(pkt, -1))
        {
            break;
        }
    }

    if (pkt)
    {
        writeMedia(pkt);
    }
    else
    {
        CLog::debug("empty.\n");
    }

    if (isStreaming() && (m_streamState == av::STATE_PAUSED))
    {
        size_t length = evbuffer_get_length(bufferevent_get_output(bev));
        CLog::debug("TcpMediaCasterConnection(%s) resume from paused %d\n", m_peerID.c_str(), length);

        m_streamState = av::STATE_PLAYING;
    }

}

void TcpMediaCasterConnection::onRead(struct bufferevent *bev)
{
    size_t len = evbuffer_get_length(bufferevent_get_input(bev));
    if (!m_headerReady)
    {
        if (len >= sizeof(MediaTransportHeader))
        {
            len = bufferevent_read(bev, &m_header, sizeof(MediaTransportHeader));
            assert(len == sizeof(MediaTransportHeader));
            m_headerReady = true;

            len = evbuffer_get_length(bufferevent_get_input(bev));
        }
    }

    if (!m_headerReady)
    {
        return;
    }

    size_t dataLen = m_header.length;
    if (len < dataLen)
    {
        return;
    }

    m_jsonBuffer.ensure(dataLen);
    bufferevent_read(bev, m_jsonBuffer.data(), dataLen);
    m_headerReady = false;

    std::string json(m_jsonBuffer.c_str(), dataLen);
    Json::Value req;
    Json::Reader reader;
    if (!reader.parse(json, req))
    {
        CLog::warning("TcpMediaCasterConnection(%s). invalid request.\n", m_peerID.c_str());
        close();
        return;
    }

    try
    {
        Json::Value resp;
        bool success = handleCommand(m_header, req, resp);

        {
            json = comn::StringCast::toString(resp);
            
            MediaTransport::setCommand(m_header, kMediaCmdResp);
            m_header.length = json.size();
            m_header.subtype = success ? kMediaCmdResp : kMediaCmdError;

            int ret = bufferevent_write(bev, (char*)&m_header, sizeof(m_header));
            bufferevent_write(bev, json.c_str(), json.size());
        }

        if (!success)
        {
            CLog::warning("TcpMediaCasterConnection(%s). bad command.\n", m_peerID.c_str());

            close();
        }
    }
    catch (std::exception& ex)
    {
        CLog::warning("TcpMediaCasterConnection(%s). execption %s\n", m_peerID.c_str(), ex.what());
        close();
    }
}

void TcpMediaCasterConnection::onEvent(struct bufferevent *bev, short events)
{
    TcpConnection::onEvent(bev, events);
}

bool TcpMediaCasterConnection::handleCommand(const MediaTransportHeader& header,
                                      Json::Value& req,
                                      Json::Value& resp)
{
    bool ret = true;
    if (header.subtype == av::kMediaCmdPlay)
    {
        ret = handlePlay(header, req, resp);
    }
    else
    {
        ret = false;
    }
    return ret;
}

bool TcpMediaCasterConnection::handlePlay(const MediaTransportHeader& header, Json::Value& req, Json::Value& resp)
{
    m_streamState = av::STATE_PLAYING;

    std::string name = req["name"].asString();

    CMediaCasterPtr caster;
    MediaCasterTable::instance().find(name, caster);
    //if (!caster)
    {
        TcpMediaCasterServer* server = (TcpMediaCasterServer*)getServer();
        TcpConnectionPtr conn = shared_from_this();
        server->onAcquireStream(name, conn);

        MediaCasterTable::instance().find(name, caster);
    }
    
    if (!caster)
    {
        return false;
    }

    m_streamName = name;
    av::MediaSinkPtr sink = shared_from_this();
    caster->addSink(sink);

    av::MediaFormat fmt;
    caster->getFormat(fmt);
    
    MediaTransportHelper::toJson(fmt, resp);

    return true;
}

bool TcpMediaCasterConnection::isStreaming()
{
    return !m_streamName.empty();
}

void TcpMediaCasterConnection::onMediaFormat(const av::MediaFormat& fmt)
{
    m_format = fmt;
}

void TcpMediaCasterConnection::onMediaPacket(av::MediaPacketPtr& pkt)
{
    comn::StopWatch watch;
    writeMedia(pkt);

    uint64_t elapse = watch.elapse();
    if (elapse > 20)
    {
        char pktType = pkt->isVideo() ? 'V' : 'A';
        CLog::warning("TcpMediaCasterConnection::onMediaPacket elapse too much. pkt type:%c, elapse:%d\n", pktType, elapse);
    }
}

void TcpMediaCasterConnection::onMediaEvent(int event)
{

}

std::string TcpMediaCasterConnection::getStreamName()
{
    return m_streamName;
}

int TcpMediaCasterConnection::writeMedia(av::MediaPacketPtr& pkt)
{
    bufferevent* bev = getBufferEvent();
    if (!bev)
    {
        return 0;
    }

    MediaTransportHeader header;
    header.magic = MediaTransport::MAGIC;
    header.tag = kMediaData;
    header.subtype = pkt->type;
    header.length = pkt->size;
    header.timestamp = pkt->pts;
    header.duration = pkt->duration;
    header.flags = pkt->flags;

    uint8_t* buf = pkt->data;
    int buf_size = pkt->size;

    evbuffer * evbuf = bufferevent_get_output(bev);
    size_t length = evbuffer_get_length(evbuf);

    if (length > m_maxCacheSize)
    {
        if (m_jamming)
        {
            /// 拥塞中, 不追加
			size_t count = m_pktQueue.dropUntilKeyFrame();
			CLog::warning("evbuffer cache is too much. cache:%d, drop:%d\n", length, count);
        }
        else
        {
            m_jamming = true;
            CLog::warning("TcpMediaCasterConnection(%s) is jamming. buffer size: %d\n", m_peerID.c_str(), length);
            bufferevent_enable(bev, EV_WRITE);
        }
    }
    else if (length > m_maxCacheSize/4) 
    {
        if (m_jamming)
        {
            /// 拥塞中, 不追加
        }
        else
        {
            int ret = bufferevent_write(bev, (char*)&header, sizeof(header));
            bufferevent_write(bev, (char*)buf, buf_size);
        }
    }
    else
    {
        if (m_jamming)
        {
            m_jamming = false;
            CLog::warning("TcpMediaCasterConnection(%s) resume from jam: buffer size%d\n", m_peerID.c_str(), length);
        }

        int ret = bufferevent_write(bev, (char*)&header, sizeof(header));
        bufferevent_write(bev, (char*)buf, buf_size);
    }

    return buf_size;
}

void TcpMediaCasterConnection::getConfigSize()
{
}


} /* namespace av */
